Skip to content

Conversation

@lihaosky
Copy link
Contributor

@lihaosky lihaosky commented Oct 13, 2025

What is the purpose of the change

Add table api support for Model and ml_predict function in https://cwiki.apache.org/confluence/display/FLINK/FLIP-526%3A+Model+ML_PREDICT%2C+ML_EVALUATE+Table+API

Brief change log

  • Add new Model interface and ModelImpl implementation for model and ml_predict
  • Add fromModelPath and from to construct Model from TableEnvironment
  • Add ModelReferenceExpression and handle it in QueryOperationConverter
  • Add anonymous model to ContextResolvedModel

Verifying this change

Unit and Integration test

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 13, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 22, 2025
Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. I left some comments.

Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments

throw new ValidationException("Anonymous models cannot be serialized.");
}

return "MODEL " + model.getIdentifier().asSerializableString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the name is lost after searilization. I am not sure whether we need to restore the object from the string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name are not used for TableReferenceExpression and literal argument name as well. These are not serialized as named argument call. I don't see this is restored from serialized string, looks mainly to convert it sql query looks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we need add a case to veriy:

  1. whether we can store a call with model reference as a view via org.apache.flink.table.api.TableEnvironment#createView(java.lang.String, org.apache.flink.table.api.Table)
  2. whether we can restore the view and run tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a IT test for 1. I'm not sure what you mean for 2. Do you mean table api restore test? I don't see restore test support table api though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test to QueryOperationSqlSerializationTest.

}
final RexTableArgCall tableArgCall =
new RexTableArgCall(rowType, inputStack.size(), partitionKeys, new int[0]);
inputStack.add(relBuilder.build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we maintain the stack and do the relBuilder.build() in the QueryOperationConverter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to do since inputStack depends on this call. Maybe that's why TableReferenceExpression was handled in QueryOperationConverter in the first place

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do like this?

  Collections.reverse(resolvedArgs);
  final List<RelNode> inputStack = new ArrayList<>();
  final List<RexNode> rexNodeArgs =
          resolvedArgs.stream()
                  .map(QueryOperationConverter.this::convertExprToRexNode)
                  .peek(
                          expr -> {
                              if (expr instanceof RexTableArgCall) {
                                  inputStack.add(relBuilder.build());
                              }
                          })
                  .collect(Collectors.toList());
  Collections.reverse(rexNodeArgs);

And in the ExpressionConverter:

 @Override
    public RexNode visit(TableReferenceExpression tableRef) {
        final LogicalType tableArgType = tableRef.getOutputDataType().getLogicalType();
        final RelDataType rowType = typeFactory.buildRelNodeRowType((RowType) tableArgType);
        final int[] partitionKeys;
        if (tableRef.getQueryOperation() instanceof PartitionQueryOperation) {
            final PartitionQueryOperation partitionOperation =
                    (PartitionQueryOperation) tableRef.getQueryOperation();
            partitionKeys = partitionOperation.getPartitionKeys();
        } else {
            partitionKeys = new int[0];
        }
        final RexTableArgCall tableArgCall =
                new RexTableArgCall(rowType, relBuilder.size() - 1, partitionKeys, new int[0]);
        //        inputStack.add(relBuilder.build());
        return tableArgCall;
    }

After the change, I find no test in ProcessTableFunctionSemanticTests fails.

I think the logic for ptf(table t1, table t2, arg) is:

  1. When building the FunctionQueryOperation, the planner first pushes
    all operands used in the RelNode tree onto the stack. As a result,
    the stack head is t2, followed by t1.
  2. We then parse the operands in reverse order. For operand table t2,
    the current stack size is 2, so we can build a RexTableArgCall to
    refer to the stack head. After visiting, we pop the stack head.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks we still need to special handle RexTableArgCall here, how about we leave original logic for TableReferenceExpression and just move ModelReferenceExpression to ExpressionConverter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.. Can you open a ticket to track this problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @lihaosky. I added a last set of comments to ensure the code stays consistent with other locations.

*
* <p>The data type will be {@link DataTypes#DESCRIPTOR()}.
*/
public static ApiExpression descriptor(ColumnList columnList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is method is unnecessary. It is like "Descriptor in a descriptor". You can just directly pass ColumnList to any method that takes expressions.

* @param modelPath The path of the model in the catalog.
* @return The {@link Model} object describing the model resource.
*/
Model fromModelPath(String modelPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm still wondering whether we should call this fromModel. It would align better with fromCall.

* @param descriptor The {@link ModelDescriptor} describing the model resource.
* @return The {@link Model} object representing the model resource.
*/
Model from(ModelDescriptor descriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this fromModel to distinguish between the heavily overloaded from() method. We should let from() be reserved for tables.

// lit() is not serializable to sql.
if (options.isEmpty()) {
return tableEnvironment.fromCall(
BuiltInFunctionDefinitions.ML_PREDICT.getName(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to call getName here, ideally we can just use ApiExpressionUtils.unresolvedCall and operationTreeBuilder and pass the FunctionDefinition. So one level deeper. this avoids a catalog lookup for the function name. catalog lookups are expensive.

Copy link
Contributor Author

@lihaosky lihaosky Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableEnvironment.fromCall has additional checks like

tableReferenceChecker.check(arguments);

which seems useful though

}

@Override
public org.apache.flink.table.api.Model fromModel(String modelPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolve all imports in this class:

Suggested change
public org.apache.flink.table.api.Model fromModel(String modelPath) {
public Model fromModel(String modelPath) {

flinkContext.getClassLoader(),
contextResolvedModel.isTemporary());
final LogicalType modelOutputType =
DataTypeUtils.fromResolvedSchemaPreservingTimeAttributes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time attributes are not a topic for models

public abstract class FunctionCallUtil {

private static final String CONFIG_ERROR_MESSAGE =
"Config parameter should be a MAP data type consisting String literals.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Config parameter should be a MAP data type consisting String literals.";
"Config parameter should be a MAP data type consisting of string literals.";

.setupTableSink(SIMPLE_SINK)
.setupConfig(
ExecutionConfigOptions.TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE,
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: simplify code

Suggested change
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED)
AsyncOutputMode.ALLOW_UNORDERED)

}

@Test
public void testPredictTableApiWithView() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test still necessary after all semantic tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed one of Shengkai's comment to test createView table api

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants